跳到主要内容

NSQ 是什么

NSQ 是什么

NSQ(NSQ Messaging System) 是一种实时分布式消息传递平台,用于在分布式系统中进行可靠的实时消息传递。它由 Bitly(一家互联网公司)开发,用于处理大规模的消息流。

NSQ 的设计目标是提供简单易用、高性能和可靠的消息传递机制。它采用去中心化的架构,其中消息生产者将消息发送到NSQ集群,而消息消费者从集群中接收和处理消息。NSQ提供了低延迟、高吞吐量和水平可扩展的特性,适用于需要实时响应的应用场景。

NSQ 的关键特性包括:

  1. 分布式和去中心化架构:NSQ集群由多个节点组成,每个节点都是对等的,没有单点故障。
  2. 实时消息传递:NSQ专注于实时消息传递,能够以低延迟和高吞吐量处理消息。
  3. 可靠性:NSQ通过持久化消息和多副本复制机制提供可靠的消息传递,并支持消息重试和故障恢复。
  4. 水平可扩展:NSQ支持水平扩展,可以通过添加更多的节点来增加吞吐量和容量。
  5. 低配置和易用性:NSQ提供简单的命令行工具和易于理解的配置选项,使得使用和管理变得相对简单。

NSQ 是一种面向实时消息传递的分布式消息传递平台,具有高性能、可靠性和简单易用的特点,适用于需要实时处理消息的分布式系统。

NSQ 组件

NSQ 由 3 个守护进程组成:

  • nsqd 是接收、队列和传送消息到客户端的守护进程。
  • nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。
  • nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)。

资源消耗

NSQ

进程启动时占用
nsqd9.2MB
nsqlookup8.5MB

Kafka

column0column1
进程启动时占用
kafka299MB
zookeeper58MB

运行与维护

\ NSQKafka
依赖Linux基础包、bash、jdk、java
耦合无!能以nsqd单进程提供完整服务,只在多节点分布式模式下需要nsqlookup依赖 zookeeper
日志标准输出,自行重定向zookeeper 1份日志,kafka 7份日志,其中两份日志按小时自动切割
配置10项左右,默认即是最优10多个独立配置文件,数百个配置项
性能优化默认开启 pprof。支持web可视化实时观测内存、协程等动态
异常排查错误日志中的栈,源码量小。不依赖网络问答也能在短时间内找出问题错误日志中的栈,深度的栈,巨量源码,排查需要深入了解其原理,大量阅读源码。否则只能通过互联网、查阅前人经验或大师级人脉。

NSQ 集群

虽然机器故障是小概率事件,但是也无法彻底避免,我们要提供高可用的服务,就必要考虑机器故障。NSQ 采用的是一个完全分布式的拓扑结构,非常适合构建起一个高可用的消息集群。

要实现高可用,有多个等级,比如部分兼容机器故障、兼容单机房故障、兼容地域主干网络故障等等。绝大部分的业务场景下,做到兼容部分机器故障是基本要求,这里我们主要总结下如何兼容部署机器故障,如果要做到更高的可用性部署拓扑也类似,就是多机房、多地域部署。

nsqd 的数量决定整个集群的吞吐能力,NSQ 具备非常好的消息处理性能,具体按实际业务量级决定部署 nsqd 节点规模,但考虑到可用性, 最少不要少于 3 个 nsqd 节点,同样也不要少于 3 个 nsdlookupd 节点。

示例部署拓扑:

启动 3 个 nsdlookupd,所有 nsqd 都连接到这三个 nsdlookupd。生成消息是直接通过负载均衡写入到 nsqd,如果那个 nsqd 挂掉,生产者就无法继续往这个 nsqd 写消息了。消费消息通过 nsdlookupd 做服务发现,配置 3 个 +nsdlookupd,其中某个挂掉不会影响消息消费,nsqd 仍然可以通过其他 nsdlookupd 被发现。nsqadmin 是个 web-ui 管理后台,无任何本地状态,单机部署多机部署都可以。

使用 Docker Compose 来搭建 NSQ 集群时,可以按照以下步骤进行操作:

  1. 创建一个名为 docker-compose.yml 的文件,并使用以下内容:
version: "3"
services:
nsqd1:
image: nsqio/nsq:v1.2.0
command: /nsqd --broadcast-address=nsqd1 --lookupd-tcp-address=lookupd:4160
ports:
- "4150:4150"
- "4151:4151"
networks:
- nsqnet
volumes:
- ./data/nsqd1:/data

nsqd2:
image: nsqio/nsq:v1.2.0
command: /nsqd --broadcast-address=nsqd2 --lookupd-tcp-address=lookupd:4160
ports:
- "4250:4150"
- "4251:4151"
networks:
- nsqnet
volumes:
- ./data/nsqd2:/data

lookupd:
image: nsqio/nsq:v1.2.0
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
networks:
- nsqnet

networks:
nsqnet:

上述配置使用了 nsqio/nsq:v1.2.0 镜像,并创建了三个服务:nsqd1nsqd2lookupd。其中,nsqd1nsqd2 是两个 NSQD 实例,lookupd 是 NSQ 的查找服务。

  1. 创建一个名为 docker-compose.override.yml 的文件,并使用以下内容:
version: "3"
services:
nsqd1:
environment:
- NSQ_LOOKUPD_TCP_ADDRESS=lookupd:4160
- NSQ_NSQD_TCP_ADDRESS=0.0.0.0:4150
- NSQ_HTTP_ADDRESS=0.0.0.0:4151

nsqd2:
environment:
- NSQ_LOOKUPD_TCP_ADDRESS=lookupd:4160
- NSQ_NSQD_TCP_ADDRESS=0.0.0.0:4150
- NSQ_HTTP_ADDRESS=0.0.0.0:4151

上述配置为每个 NSQD 实例设置了环境变量,指定了各自的 NSQD 和 HTTP 地址,并告知 NSQD 实例连接到 lookupd 服务。

  1. 在命令行中进入包含上述两个文件的目录,然后运行以下命令启动 NSQ 集群:
docker-compose up -d

这将启动三个容器:两个 NSQD 实例和一个 lookupd 实例。

  1. 现在,你可以通过以下地址访问 NSQD 实例和 lookupd 实例:
  • NSQD1: 127.0.0.1:4150 (TCP) 和 127.0.0.1:4151 (HTTP)
  • NSQD2: 127.0.0.1:4250 (TCP) 和 127.0.0.1:4251 (HTTP)
  • lookupd: 127.0.0.1:4160 (TCP) 和 127.0.0.1:4161 (HTTP)

现在,你已经成功搭建了一个简单的 NSQ 集群,其中包含两个 NSQD 实例和一个 lookupd 实例。你可以使用这个集群来发送和接收消息,并享受 NSQ 的分布式消息处理能力。记得根据实际需求进行配置和调整,例如添加更多的 NSQD 实例或使用负载均衡器来平衡流量。

然后访问的时候是使用 lookupd 去取得具体的实例地址

consumer, err := nsq.NewConsumer(topic, "my_channel", config)
consumer.ConnectToNSQLookupds([]string{"lookupd:4160"})

在这种配置下,消费者将连接到 lookupd 实例,并从它那里获取可用的 NSQD 实例列表。如果某个 NSQD 实例不可用,消费者会自动切换到其他可用的 NSQD 实例来接收消息。

使用 Docker 部署服务

# docker-compose.yml
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160"
- "31001:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "31041:4150"
- "4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "31011:4171"
提示

NSQD 默认监听在 TCP 端口 4150 上。NSQD 是 NSQ 的守护进程,它负责接收、存储和分发消息。当生产者通过 NSQ 库将消息发送到 NSQD 时,它们会连接到 NSQD 的 4150 端口。同样,消费者通过连接到 NSQD 的 4150 端口来消费消息。

需要注意的是,4150 端口只是 NSQD 默认的监听端口,可以在配置文件中自定义 NSQD 的监听端口。在上述示例中,如果在 NSQD 配置中将监听端口更改为其他值,需要相应地更新生产者和消费者代码中的端口。(例如上面的例子就映射成了 31041)

docker-compose up -d

最后检查是否成功启动了

curl http://127.0.0.1:31001/ping

范围 http://127.0.0.1:31011/ 来访问 admin UI 界面

在 Golang 上消费和发送消息

如下例子

package main

import (
"fmt"
"log"
"time"

"github.com/nsqio/go-nsq"
)

// 消费者处理消息的回调函数
type MessageHandler struct{}

func (h *MessageHandler) HandleMessage(message *nsq.Message) error {
fmt.Printf("Received message: %s\n", message.Body)
return nil
}

func main() {
// 创建一个生产者
producer, err := nsq.NewProducer("localhost:31041", nsq.NewConfig())
if err != nil {
log.Fatal(err)
}

// 向指定主题发送消息
topic := "my_topic"

// 创建一个消费者
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer(topic, "my_channel", config)
if err != nil {
log.Fatal(err)
}

// 设置消息处理函数
consumer.AddHandler(&MessageHandler{})

go func() {
for i := 0; i < 100; i++ {
err = producer.Publish(topic, []byte(fmt.Sprintf("Hello, NSQ!, %d", i)))
time.Sleep(10 * time.Millisecond)
if err != nil {
log.Fatal(err)
}
}
consumer.Stop()
}()

// 连接到NSQD
err = consumer.ConnectToNSQD("localhost:31041")
if err != nil {
log.Fatal(err)
}

// 运行消费者,直到接收到中断信号
<-consumer.StopChan

// 关闭生产者连接
producer.Stop()
}

输出:

2023/06/14 09:03:34 INF    1 (localhost:31041) connecting to nsqd
2023/06/14 09:03:34 INF 2 [my_topic/my_channel] (localhost:31041) connecting to nsqd
Received message: Hello, NSQ!, 1
.....
Received message: Hello, NSQ!, 98
Received message: Hello, NSQ!, 99
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] stopping...
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) received CLOSE_WAIT from nsqd
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) beginning close
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) readLoop exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) breaking out of writeLoop
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) writeLoop exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) finished draining, cleanup exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) clean close complete
2023/06/14 09:10:54 WRN 2 [my_topic/my_channel] there are 0 connections left alive
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] stopping handlers
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] rdyLoop exiting
2023/06/14 09:10:54 INF 1 (localhost:31041) stopping
2023/06/14 09:10:54 INF 1 (localhost:31041) exiting router

nsq.NewConsumer 第二个入参

在 NSQ 中,消费者通过订阅主题和通道来接收消息。nsq.NewConsumer 的第二个参数是通道名称(channel name),它用于标识消费者所属的通道。

在 NSQ 中,每个主题可以有多个通道,每个通道都是独立的消息消费者组。通过使用不同的通道名称,可以实现消息的负载均衡和并行处理。每个通道中的消费者将独立地接收和处理主题中的消息,不同通道的消费者之间不会互相干扰。

当多个消费者同时订阅同一个主题时,如果它们使用相同的通道名称,那么它们将共享接收到的消息。每条消息只会被同一个通道中的一个消费者处理。这种方式适用于实现消息的负载均衡和水平扩展。

如果你希望每个消费者都独立地接收主题中的所有消息,可以为每个消费者使用不同的通道名称。这样,每个通道中的消费者将独立处理消息,实现并行处理的能力。

示例代码中的 nsq.NewConsumer 的第二个参数就是通道名称,你可以根据需要选择合适的通道名称来实现你的消费者逻辑。

consumer, err := nsq.NewConsumer("my_topic", "my_channel", config)

在上述代码中,消费者使用 my_channel 作为通道名称,用于接收 my_topic 主题中的消息。你可以根据需要将通道名称更改为你自己的名称,确保消费者逻辑的正确运行。

References